refactor(gateway): pure-C++ managers via Transport adapters + discovery split + graph-event refresh#397
refactor(gateway): pure-C++ managers via Transport adapters + discovery split + graph-event refresh#397bburda wants to merge 31 commits into
Conversation
7b0a134 to
8397e1c
Compare
Extracts ServiceCallResult, the ActionGoalStatus enum + free action_status_to_string helper, the four Action*Result structs, and ActionGoalInfo into a dedicated header under core/operations/. The types were already neutral C++ but lived behind the rclcpp-pulling operation_manager.hpp include path, which prevented the neutral build layer from consuming them. The OperationManager header forwards to the new location; the helper definition moves alongside into a small core translation unit so the symbol is provided by gateway_core.
…ation Extracts ParameterErrorCode and ParameterResult into a dedicated header under core/configuration/. The structured-error contract becomes available to the neutral build layer; the ConfigurationManager header forwards to the new location.
Extracts FaultResult, the JSON-bag outcome shared by seven of the eight fault-manager methods, into a dedicated header under core/faults/. The existing FaultWithEnvResult still exposes raw message types and stays inside fault_manager.hpp until the transport extraction moves the message-to-JSON conversion to the adapter layer.
…sports Defines seven abstract ports - TopicTransport, ServiceTransport, ActionTransport, ParameterTransport, FaultServiceTransport, LogSource, TopicSubscriptionTransport - that the manager refactors will route through. Each port carries the same operations the corresponding manager performs today, expressed in pure C++ types (already-neutral result structs from core/operations, core/configuration, core/faults). The smoke test extends to assert each interface is abstract and reachable from the gateway_core build layer with no ament dependency on the link line. Adapter implementations land under src/ros2/transports/ in subsequent phases. Two name choices avoid collisions with structs that currently live in the ROS-coupled headers: * TopicTransport::sample returns TopicSample rather than TopicSampleResult, because core/data/data_types.hpp already defines a same-named struct with a different shape (per-topic batch errors, endpoint QoS) for the topic-data-provider plugin surface. * FaultServiceTransport::get_fault_with_env returns FaultWithEnvJsonResult rather than FaultWithEnvResult, because the legacy ros2_medkit_msgs-typed FaultWithEnvResult still lives next to the FaultManager facade. The two will be reconciled when the FaultManager migration lands. TopicSubscriptionHandle is a polymorphic RAII base (virtual destructor only) rather than a fully abstract port, so the smoke assertion uses sizeof > 0 instead of std::is_abstract_v.
…pter The data-access manager body becomes pure C++. All rclcpp use - generic publisher cache, JSON serializer instance, native sample backend, type introspection - moves into the new Ros2TopicTransport adapter under src/ros2/transports/. The manager now takes a shared_ptr<TopicTransport> plus the sample timeout and routes publish / sample / native-sample through it. The adapter exposes the type-introspection helper through the transport interface so handlers retain their existing accessor. The handler-facing public API (publish_to_topic, get_topic_sample_with_ fallback, get_topic_sample_native, get_type_introspection, set_topic_data_provider, get_topic_data_provider, get_topic_sample_ timeout) is preserved verbatim. The legacy include path ros2_medkit_gateway/data_access_manager.hpp is preserved as a forwarding shim. New mock-transport tests link only against gateway_core + GTest, proving the manager logic compiles without rclcpp on the link line.
…sports OperationManager retains all goal-tracking, UUID generation, validation, and component-namespace resolution logic in pure C++. The rclcpp side - GenericServiceClient cache, action-internal-service clients, the GoalStatusArray subscription cache - moves into two new adapters, Ros2ServiceTransport and Ros2ActionTransport, under src/ros2/transports/. The manager registers a per-goal status callback at subscribe time so the action transport pushes goal-status updates back into the manager's tracking map on its own thread. Goal-status array decoding moves into the transport; the manager only consumes the typed (path, goal_id, status) tuples. The handler-facing public API (call_service, call_component_service, send_action_goal, send_component_action_goal, cancel_action_goal, get_action_result, list_tracked_goals, update_goal_status, update_goal_feedback, cleanup_old_goals, subscribe_to_action_status, unsubscribe_from_action_status, the four static is_*-type helpers, the UUID helpers) is preserved. The component-name resolver dependency moves behind a new ServiceActionResolver port that DiscoveryManager now implements; this keeps the manager translation unit out of discovery_manager.hpp's rclcpp transitive include chain. Mock-transport tests link only against gateway_core + GTest, exercising 13 routing / lifecycle scenarios.
ConfigurationManager loses its rclcpp::SyncParametersClient cache, the rclcpp::Parameter defaults cache, the negative-cache for unreachable nodes, and the spin_mutex serialising parameter-client spins. All of these move into the new Ros2ParameterTransport adapter under src/ros2/transports/. The adapter also owns the JSON <-> rclcpp:: ParameterValue conversion helpers and the gateway-own-FQN check that previously lived as private members of the manager. Manager retains: per-entity reset orchestration (reset_parameter and reset_all_parameters compose set_parameter with transport-supplied defaults via the new get_default and list_defaults methods on ParameterTransport), the self-node short circuit (delegated to the transport's is_self_node), and the shutdown ordering contract (idempotent manager.shutdown() fans out to transport.shutdown() before rclcpp::shutdown()). The handler-facing public API (list_parameters, get_parameter, set_parameter, reset_parameter, reset_all_parameters, shutdown) is preserved verbatim; the legacy include path ros2_medkit_gateway/configuration_manager.hpp is preserved as a forwarding shim. Parameter-service tuning parameters (parameter_service_timeout_sec, parameter_service_negative_cache_sec) are declared at gateway_node wiring time and passed to the transport constructor. ParameterTransport gains two new pure-virtual methods (get_default, list_defaults) that surface the cached defaults as neutral JSON for manager-level reset orchestration. The gateway_core link-time smoke test now also pins ConfigurationManager. Mock-transport tests link exclusively against gateway_core + GTest, with no rclcpp on the link line, covering CRUD delegation, self-node short-circuit, reset via cached defaults, partial-failure aggregation in reset_all_parameters, and shutdown idempotency.
FaultManager loses its seven rclcpp::Client members and their seven
per-client mutexes; all move into the new Ros2FaultServiceTransport
adapter under src/ros2/transports/. The adapter performs the
ros2_medkit_msgs to JSON conversion internally and returns the neutral
FaultResult / FaultWithEnvJsonResult structures, so the manager body now
lives in the ROS-free build layer (gateway_core).
The handler-facing public API (report_fault, list_faults, get_fault,
get_fault_with_env, clear_fault, get_snapshots, get_rosbag,
list_rosbags, is_available, wait_for_services) is preserved verbatim.
The single behaviour change is that get_fault_with_env now returns
the response body as JSON: data carries { "fault": {...},
"environment_data": {...} } already converted by the transport. The
single handler call site (build_sovd_fault_response) is updated to
consume the JSON, post-processing rosbag snapshots to add the per-
request bulk_data_uri and freeze_frame snapshots to extract the
primary value.
The previously static FaultManager::fault_to_json helper is removed.
Two external call sites (SSEFaultHandler, TriggerFaultSubscriber)
subscribe directly to the FaultEvent topic and convert the message
themselves; they now use the small ros2/conversions/fault_msg_conversions
module that lives at the ROS-coupled boundary alongside the transport.
The legacy include path ros2_medkit_gateway/fault_manager.hpp is
preserved as a forwarding shim. New mock-transport tests link only
against gateway_core + GTest, proving the manager logic compiles
without rclcpp on the link line.
LogManager keeps its ring-buffer storage, per-entity config map, monotonic id counter, plugin observer pattern, and the manages_ingestion short-circuit - all pure C++. The /rosout subscription and the rcl_interfaces::msg::Log to LogEntry conversion move into the new Ros2LogSource adapter under src/ros2/transports/. The adapter emits LogEntry through a callback the manager registers at start() time; when the primary LogProvider declares full ingestion, the manager never calls start() (matching today's behaviour of not creating the subscription at all in that mode). To let the manager body live in gateway_core alongside the other managers, the PluginManager pointer is replaced with a thin LogProviderRegistry port (primary_log_provider + log_observers). PluginManager implements the port inline, so production wiring is unchanged. The handler-facing public API (get_logs, get_config, update_config, add_log_entry, set_notifier, set_node_to_entity_resolver, the static helpers, inject_entry_for_testing) is preserved verbatim. Mock-source tests link only against gateway_core + GTest, proving the manager logic compiles without rclcpp on the link line. The legacy include path ros2_medkit_gateway/log_manager.hpp is preserved as a forwarding shim.
…sport TriggerManager keeps all its condition-evaluation, retry-unresolved, sweep-orphaned, persistence, dispatch-index, and entity-cache logic in pure C++. The pointer to TriggerTopicSubscriber is replaced with a shared TopicSubscriptionTransport interface; the new Ros2TopicSubscriptionTransport adapter wraps the existing TriggerTopicSubscriber, preserving its subscription-destructor pattern. Per-trigger subscription handles live in the manager's tracking map - destruction unsubscribes - so trigger lifetime fully drives subscription lifetime. The trigger manager source moves to src/core/managers/ so it is picked up by gateway_core's GLOB. TriggerTopicSubscriber moves to include/ros2_medkit_gateway/ros2/ and src/ros2/, and a forwarding shim at the old header path keeps existing includes working. TriggerTopicSubscriber itself becomes a generic per-handle subscription executor: subscribe(topic, type, handle_key, callback) creates one GenericSubscription per key, unsubscribe(handle_key) tears it down, and pending/retry semantics are preserved with the same 60s timeout. The handler-facing public API (create / get / list / update / remove, wait_for_event, consume_pending_event, set_on_removed, set_entity_children_fn, set_entity_exists_fn, set_resolve_topic_fn, retry_unresolved_triggers, sweep_orphaned_triggers, load_persistent_triggers) is preserved verbatim. Mock-transport tests link only against gateway_core + GTest, proving the manager body compiles without rclcpp on the link line.
RuntimeDiscoveryStrategy is replaced by Ros2RuntimeIntrospection, which implements the existing IntrospectionProvider interface used by the merge pipeline for plugin-driven entity discovery. Built-in ROS graph queries are now treated identically to plugin-provided introspection - one chain, one merge policy. HybridDiscoveryStrategy is removed; the equivalent runtime + manifest + plugin merge is the default MergePipeline configuration. The discovery_mode parameter (runtime_only / manifest_only / hybrid) now controls which layers the merge pipeline activates rather than which strategy class to instantiate.
The class is the rosidl_typesupport_cpp + rosidl_typesupport_introspection_cpp
bridge built on top of JsonSerializer; it semantically belongs alongside the
rest of the rosidl glue rather than in the gateway. Gateway packages now
depend on the serialization library for type schemas instead of duplicating
the introspection backend.
- Move type_introspection.{hpp,cpp} to ros2_medkit_serialization (preserves
Apache 2.0 header verbatim).
- Migrate the namespace from ros2_medkit_gateway to ros2_medkit_serialization;
qualify all consumer call sites (gateway managers, transports, providers,
handlers, discovery, tests).
- Update forward declarations in topic_transport.hpp,
data_access_manager.hpp, discovery_manager.hpp.
- Move the TypeIntrospection unit suite from
src/ros2_medkit_gateway/test/test_data_access_manager.cpp to
src/ros2_medkit_serialization/test/test_type_introspection.cpp; register
the new test target.
- Drop src/type_introspection.cpp from the gateway gateway_ros2 source list.
The rosidl_typesupport_cpp / rosidl_typesupport_introspection_cpp deps remain
in the gateway because the GenericClient compat shim still uses them.
Replace the cyclic wall-timer loop that called refresh_cache() every refresh_interval_ms with an event-driven design: - A 100 ms wall timer polls rclcpp::Node::get_graph_event()->check_and_clear() and runs refresh_cache() only when the ROS 2 graph actually changed (node up/down, topic/service/action add or remove). On a stable graph the callback is a single atomic check. - A second wall timer at refresh_interval_ms acts as a safety backstop, forcing an unconditional refresh so liveness is preserved if a graph event is ever missed. Semantics of the existing refresh_interval_ms parameter shift from "primary refresh interval" to "safety-backstop interval", and the default moves from 10 s (gateway_params.yaml) / 2 s (gateway.launch.py) to 30 s. The validation range stays 100-60000 ms; existing test overrides at 1000 ms continue to work, just as a tighter backstop. Test impact: - New integration test verifies that a node spawned mid-run appears in /apps within 5 s while the gateway runs with a 30 s backstop, proving the graph-event poll path is what drives the refresh. - test_operation_handlers seeds the entity cache directly and used to rely on refresh_cache() never firing during the 1 s settle period. Graph events from the test executor now do trigger refreshes; seed after the settle so the test's manually-injected component wins. Design intent on idle CPU: the previous timer woke and ran the full refresh_cache() pipeline every refresh_interval_ms regardless of whether the graph had changed. With this change an idle gateway runs only the 100 ms graph-event check (a single atomic load) plus the 30 s backstop. Documentation updated: README.md parameter tables, gateway_params.yaml comments, design/aggregation.rst.
Persistent triggers loaded at startup were being removed before DDS discovery had populated the entity cache. The graph-event refresh path fires on every node up/down event, including the cold-start window where the cache reflects only a partial view of the ROS 2 graph. Running the orphan sweep on every graph event treated restored triggers as orphans because their target entities had not yet been seen by this gateway, removing them from both memory and the persistent SQLite store. Move the sweep call to the backstop timer only. The backstop runs at the configured refresh cadence (default 30 s, 1 s in tests), giving DDS time to converge before any orphan check. The graph-event path keeps doing the cheap cache refresh, so spawn-detection latency is unchanged.
Each test in the suite connects to a non-existent OPC UA host and waits about 3.8 s for the DNS / TCP failure path. With 13 tests the wallclock run requires roughly 90 s. The ament_add_gtest default timeout of 60 s truncated the run mid-suite, which CTest then surfaced as a "missing_result" error under heavy parallelism (visible in concurrent colcon test invocations on multi-core hosts). Set TIMEOUT 240 to give a comfortable margin without hiding genuine hangs.
The neutral-managers refactor moved DataAccessManager, OperationManager, ConfigurationManager, FaultManager, LogManager, and TriggerManager into ``gateway_core`` and gave each one a Transport interface for ROS interaction. Update the layered-library note to reflect that placement and the new neutral interfaces. TriggerTopicSubscriber became a generic per-handle subscription executor wrapped by Ros2TopicSubscriptionTransport. Update its class summary, component list, and the TriggerManager arrow on the class diagram so they describe the new per-trigger handle ownership rather than the old reference-counted shared-subscription model.
Apply clang-tidy fixes across the files this branch already modifies so
the incremental clang-tidy job (which scans every file changed in a PR)
stays clean.
- ``fault_handlers.cpp``: replace temp-allocating ``operator+`` chain in
``bulk_data_uri`` construction with explicit ``operator+=`` /
``std::move`` (performance-inefficient-string-concatenation).
- ``test_configuration_manager.cpp``: pre-allocate ``threads.reserve(...)``
before ``emplace_back`` loops (performance-inefficient-vector-operation).
- ``test_configuration_manager_routing.cpp``,
``test_data_access_manager_routing.cpp``,
``test_operation_manager_routing.cpp``,
``test_operation_handlers.cpp``: name unused parameters in mock and
override signatures (readability-named-parameter,
misc-unused-parameters).
- ``test_fault_handlers.cpp``: switch single-character
``std::string::find("X")`` calls to ``find('X')``
(performance-faster-string-find).
- ``test_operation_handlers.cpp``: replace ``std::bind(...)`` with
forwarding lambdas (modernize-avoid-bind), and pass shared_ptr
arguments by const-reference instead of by value
(performance-unnecessary-value-param).
- ``test_trigger_manager_routing.cpp``: use ``empty()`` instead of
``size()`` in a boolean context (readability-container-size-empty).
After these changes ``clang-tidy -p build`` is clean on every cpp this
branch touches.
Follow-up to the earlier clang-tidy fixup. After clang-format wrapped the goal-callback lambda onto its own line, clang-tidy re-evaluated the parameter list and re-flagged ``goal`` as a copied ``std::shared_ptr`` only used as a const reference. Switch the parameter to ``const ...&`` to match the rest of the action lambdas in the file.
…hook clang-tidy prints a 'X warnings generated. Suppressed Y' summary plus source snippets on stderr even when every diagnostic is filtered out. Streaming that for every clean file across a 100-file pre-push run fills pre-commit's output buffer; the framework then fails with BlockingIOError on sys.stdout.buffer.write and the push aborts before git ever opens the connection. Capture clang-tidy stdout+stderr per file and only emit it on non-zero exit. Add --quiet to drop the progress lines that clang-tidy keeps even when there are no diagnostics. Effect: clean files produce zero output, so the framework never has a multi-megabyte buffer to flush; only files with real findings produce output, exactly as before.
…ndant c_str Two pre-existing clang-tidy findings on the test fixture surface in the pre-push hook now that the gateway PR brings the affected file into scope: - LocalHttpServer owns a std::thread and a raw httplib::Server pointer and declares a non-default destructor, so the project's cppcoreguidelines-special-member-functions check (with AllowSoleDefaultDtor) demands all five special members. Mark copy/move as = delete since the wrapped server cannot be safely duplicated. - The httplib::Server::Get overload accepts a std::string by const reference, so the .c_str() conversion is redundant (readability-redundant-string-cstr).
9bae856 to
efd9e3e
Compare
…ager
Two pre-existing clang-tidy findings on the test fixture surface in the
pre-push hook now that the routing refactor brings the affected file
into scope:
- The local using json = nlohmann::json shadows the namespace-scope
alias the manager+transport headers already export; rely on the
inherited one (clang-diagnostic-shadow).
- std::string::find("T") is the const-char-pointer overload; the
single-character literal calls for find('T')
(performance-faster-string-find).
…dentation docutils 0.21 reads the second line of a wrapped sub-bullet as a new indentation block (build-docs CI fails with -W under sphinx 8). Collapse the wrap to a single line so the bullet text stays in the parent list. Affects only line 466 of the design index; no semantic content change.
…stripped libcpp-httplib TSan races Two CI failures uncovered after the rebase to the graph-event-driven discovery refresh: ASan failure on OperationHandlersFixtureTest.ListExecutionsReturnsTrackedActionGoal: Sending an action goal subscribes to the action's feedback/result topics, which the gateway's graph-event refresh observes and uses to rewrite the entity cache from its (empty) own discovery view, wiping the manually seeded engine component. Under ASan the refresh has more wall-clock to land between create_action_execution() returning and the subsequent handle_list_executions() call. Re-seed the cache from the helper after handle_create_execution so any subsequent handler call still sees the engine entity. TSan races in libcpp-httplib.so triggered by daisy-chain aggregation: The reported races live in cpp-httplib's own listen / accept / response paths between two server threads spawned from RESTServer::start. An existing race:httplib::* suppression covers symbolised frames; the system-packaged .so however is shipped stripped, so the TSan stack ends in <null> frames inside libcpp-httplib.so.0.14 and the symbolic suppression no longer matches. Add a called_from_lib: rule for both the versioned and unversioned soname so the stripped frames are silenced the same way upstream-symbolised builds were.
There was a problem hiding this comment.
Pull request overview
Refactors the gateway so SOVD managers and related business logic live in the ROS-free gateway_core layer behind neutral Transport interfaces, with ROS 2 specifics moved into gateway_ros2 adapters. This improves unit-test isolation (no rclcpp link dependency for manager tests), reshapes runtime discovery to flow through the IntrospectionProvider chain, and switches refresh behavior to primarily graph-event-driven with a periodic safety backstop.
Changes:
- Introduces/expands neutral Transport ports (topic/service/action/parameter/fault/log/trigger-subscription) and routes core managers through them, enabling
gateway_core-only unit tests with mock transports. - Reworks discovery to use
Ros2RuntimeIntrospection(as anIntrospectionProvider) and updates docs/config defaults for graph-event-driven refresh + backstop timer. - Relocates TypeIntrospection into
ros2_medkit_serializationand updates gateway call sites/tests accordingly.
Reviewed changes
Copilot reviewed 107 out of 107 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tsan_suppressions.txt | Expands TSan suppressions to match stripped libcpp-httplib.so frames by library name. |
| scripts/clang-tidy-diff.sh | Suppresses “clean file” clang-tidy output unless failures occur (pre-commit friendliness). |
| scripts/check_no_naked_subscriptions.sh | Updates allowed legacy files list to reflect ROS2 adapters moved under src/ros2/. |
| src/ros2_medkit_serialization/CMakeLists.txt | Adds TypeIntrospection source + new gtest for it. |
| src/ros2_medkit_serialization/include/ros2_medkit_serialization/type_introspection.hpp | Moves TypeIntrospection into ros2_medkit_serialization namespace and updates serializer member type. |
| src/ros2_medkit_serialization/src/type_introspection.cpp | Updates include/namespace and exception types after relocation into serialization package. |
| src/ros2_medkit_serialization/test/test_type_introspection.cpp | Adds unit tests covering schema/template/info retrieval and caching behavior. |
| src/ros2_medkit_plugins/ros2_medkit_opcua/CMakeLists.txt | Increases test_opcua_plugin timeout to 240s to avoid CI truncation. |
| src/ros2_medkit_plugins/ros2_medkit_graph_provider/test/test_graph_provider_plugin.cpp | Fixes httplib route registration call style and makes LocalHttpServer non-copyable/movable for RAII safety. |
| src/ros2_medkit_gateway/CMakeLists.txt | Rewires gateway_ros2 sources (adds ROS2 adapters; removes managers moved to core) and adds gateway_core-only routing tests. |
| src/ros2_medkit_gateway/config/gateway_params.yaml | Updates refresh semantics docs + default refresh_interval_ms for safety-backstop mode. |
| src/ros2_medkit_gateway/launch/gateway.launch.py | Updates launch arg description to clarify safety-backstop refresh interval meaning. |
| src/ros2_medkit_gateway/launch/gateway_https.launch.py | Same as above for HTTPS launch file. |
| src/ros2_medkit_gateway/README.md | Updates documented default and semantics for refresh_interval_ms after graph-event refresh change. |
| src/ros2_medkit_gateway/design/index.rst | Updates design docs for managers-in-core + transport adapters + discovery class diagram changes. |
| src/ros2_medkit_gateway/design/aggregation.rst | Updates aggregation health-check description to match graph-event refresh + backstop timer. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp | Adds members for ROS2 transport adapters and graph-event/backstop timers; updates refresh locking comments. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_manager.hpp | Refactors discovery manager to own pipeline + cached result, implement ServiceActionResolver, and use Ros2RuntimeIntrospection. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_pipeline.hpp | Updates thread-safety warnings to refer to DiscoveryManager’s caching/mutexing. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/discovery/discovery_strategy.hpp | Marks DiscoveryStrategy as legacy, with discovery now driven by providers/pipeline. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/discovery/service_action_resolver.hpp | Adds a neutral resolver interface to decouple OperationManager from DiscoveryManager headers. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/discovery/layers/runtime_layer.hpp | Switches RuntimeLayer dependency to Ros2RuntimeIntrospection. |
| src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp | Implements RuntimeLayer changes using Ros2RuntimeIntrospection. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/providers/ros2_runtime_introspection.hpp | Introduces ROS2 graph introspection provider implementing IntrospectionProvider + direct query helpers. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/topic_transport.hpp | Adds TopicTransport port + TopicSample neutral result struct. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/service_transport.hpp | Adds ServiceTransport port for generic service calls returning JSON. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/action_transport.hpp | Adds ActionTransport port for action send/cancel/get_result + status subscription callbacks. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/parameter_transport.hpp | Adds ParameterTransport port for parameter CRUD + cached defaults + shutdown/invalidate semantics. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/fault_service_transport.hpp | Adds FaultServiceTransport port returning neutral JSON-shaped fault results. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/log_source.hpp | Adds LogSource port for /rosout-like log ingestion. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/topic_subscription_transport.hpp | Adds TopicSubscriptionTransport port + RAII TopicSubscriptionHandle for trigger data subscriptions. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_service_transport.hpp | Declares ROS2 adapter implementing ServiceTransport with generic-client caching. |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_service_transport.cpp | Implements ServiceTransport via generic clients + serialization. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_action_transport.hpp | Declares ROS2 adapter implementing ActionTransport (generic internal service clients + status subs). |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_action_transport.cpp | Implements ActionTransport via generic internal action services and status subscriptions. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_parameter_transport.hpp | Declares ROS2 adapter implementing ParameterTransport with client/default/negative caches and spin mutex. |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_parameter_transport.cpp | Implements ParameterTransport via SyncParametersClient + JSON conversions + caches. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp | Declares ROS2 adapter implementing FaultServiceTransport (7 service clients + per-client mutexes). |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp | Implements Fault service calls and message-to-JSON conversions in the adapter. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_log_source.hpp | Declares ROS2 adapter implementing LogSource via /rosout subscription. |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_log_source.cpp | Implements /rosout ingestion + callback wiring for core LogManager. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_topic_transport.hpp | Declares ROS2 adapter implementing TopicTransport and owning TypeIntrospection. |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_topic_transport.cpp | Implements Topic publish/sample/count and exposes TypeIntrospection to core. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_topic_subscription_transport.hpp | Declares ROS2 adapter implementing TopicSubscriptionTransport by wrapping TriggerTopicSubscriber. |
| src/ros2_medkit_gateway/src/ros2/transports/ros2_topic_subscription_transport.cpp | Implements TopicSubscriptionTransport subscribe() returning RAII handles for deterministic unsubscribe. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/trigger_topic_subscriber.hpp | Converts legacy include to a backwards-compat shim pointing at the new ros2 header. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/trigger_topic_subscriber.hpp | Introduces new per-handle TriggerTopicSubscriber API (no ref-counted per-topic sharing). |
| src/ros2_medkit_gateway/src/ros2/trigger_topic_subscriber.cpp | Implements the per-handle trigger subscription executor + retry behavior. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/operations/operation_types.hpp | Introduces ROS-neutral operation result structs/enums for services/actions. |
| src/ros2_medkit_gateway/src/core/operations/operation_types.cpp | Implements action_status_to_string() helper. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/data_access_manager.hpp | Adds core DataAccessManager using TopicTransport. |
| src/ros2_medkit_gateway/src/core/managers/data_access_manager.cpp | Implements publish/sample logic and the “no publishers => metadata_only” fast path. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/configuration_manager.hpp | Adds core ConfigurationManager using ParameterTransport + reset orchestration. |
| src/ros2_medkit_gateway/src/core/managers/configuration_manager.cpp | Implements shutdown semantics and reset operations via cached defaults. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/configuration/parameter_types.hpp | Introduces ParameterResult + structured error codes in core. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/faults/fault_types.hpp | Introduces core FaultResult / FaultWithEnvJsonResult JSON-shaped contracts. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/fault_manager.hpp | Adds core FaultManager delegating to FaultServiceTransport. |
| src/ros2_medkit_gateway/src/core/managers/fault_manager.cpp | Implements FaultManager as pure delegation layer over the transport. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/fault_manager.hpp | Converts legacy include to a backwards-compat shim pointing at core manager. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/data_access_manager.hpp | Converts legacy include to a backwards-compat shim pointing at core manager. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/providers/log_provider_registry.hpp | Introduces LogProviderRegistry port so LogManager can depend on a neutral interface. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/plugins/plugin_manager.hpp | Makes PluginManager implement LogProviderRegistry via thin forwards. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/trigger_manager.hpp | Updates TriggerManager to use TopicSubscriptionTransport + per-trigger RAII handles instead of TriggerTopicSubscriber*. |
| src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp | Updates includes to use relocated TypeIntrospection. |
| src/ros2_medkit_gateway/src/http/handlers/operation_handlers.cpp | Updates includes to use relocated TypeIntrospection. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/fault_handlers.hpp | Changes build_sovd_fault_response() signature to consume transport-supplied JSON instead of ROS messages. |
| src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp | Reworks SOVD fault response building to post-process transport JSON and construct per-request bulk_data_uri. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/conversions/fault_msg_conversions.hpp | Adds ROS-boundary conversion helpers for Fault/EnvironmentData -> JSON. |
| src/ros2_medkit_gateway/src/ros2/conversions/fault_msg_conversions.cpp | Implements the fault/environment-data JSON conversions used by SSE + triggers + fault transport. |
| src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp | Switches to shared fault JSON conversion helper (no longer depends on FaultManager::fault_to_json). |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/trigger_fault_subscriber.hpp | Updates docs to refer to new fault conversion helper. |
| src/ros2_medkit_gateway/src/trigger_fault_subscriber.cpp | Switches to shared fault JSON conversion helper. |
| src/ros2_medkit_gateway/src/openapi/schema_builder.hpp | Updates comment to reference new fault conversion helper. |
| src/ros2_medkit_gateway/test/test_gateway_core_smoke.cpp | Extends core-only smoke test to include core managers + transport ports + resolver interface. |
| src/ros2_medkit_gateway/test/test_data_access_manager_routing.cpp | Adds gateway_core-only routing test via MockTopicTransport. |
| src/ros2_medkit_gateway/test/test_configuration_manager.cpp | Updates tests to construct ConfigurationManager via Ros2ParameterTransport adapter. |
| src/ros2_medkit_gateway/test/test_operation_manager.cpp | Updates tests to construct OperationManager with Ros2 service/action transport adapters. |
| src/ros2_medkit_gateway/test/test_operation_handlers.cpp | Updates action server callbacks and fixes cache seeding timing with graph-event refresh behavior. |
| src/ros2_medkit_gateway/test/test_fault_manager.cpp | Updates tests to construct FaultManager via Ros2FaultServiceTransport adapter. |
| src/ros2_medkit_gateway/test/test_log_manager.cpp | Updates tests to construct LogManager via Ros2LogSource adapter. |
| src/ros2_medkit_gateway/test/test_runtime_discovery.cpp | Updates runtime discovery tests to use Ros2RuntimeIntrospection and introspect() behavior. |
| src/ros2_medkit_gateway/test/test_trigger_manager.cpp | Minor tidy changes (removes unused alias; uses char overload of find()). |
| src/ros2_medkit_gateway/src/discovery/hybrid_discovery.cpp | Removes HybridDiscoveryStrategy implementation (replaced by DiscoveryManager pipeline caching). |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/hybrid_discovery.hpp | Removes HybridDiscoveryStrategy header (replaced by pipeline caching in DiscoveryManager). |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/runtime_discovery.hpp | Removes RuntimeDiscoveryStrategy header (replaced by Ros2RuntimeIntrospection). |
| void Ros2LogSource::stop() { | ||
| // Mark shutdown so any in-flight callback short-circuits before touching | ||
| // members that are about to destruct. | ||
| shutdown_requested_.store(true, std::memory_order_release); | ||
|
|
|
|
||
| if (!client->wait_for_service(std::chrono::seconds(5))) { | ||
| result.success = false; | ||
| result.error_message = "Service not available: " + service_path; | ||
| return result; |
| Ros2ServiceTransport::Ros2ServiceTransport(rclcpp::Node * node) | ||
| : node_(node), serializer_(std::make_shared<ros2_medkit_serialization::JsonSerializer>()) { | ||
| RCLCPP_INFO(node_->get_logger(), "Ros2ServiceTransport initialised (native serialization)"); | ||
| } |
|
|
||
| if (!clients.send_goal_client->wait_for_service(std::chrono::seconds(5))) { | ||
| result.error_message = "Action server not available: " + action_path; | ||
| return result; | ||
| } |
Five issues surfaced from a multi-agent self-review on the Transport
refactor:
Ros2ActionTransport::generate_uuid - static thread_local engine:
The translation unit is part of gateway_lib which links into
test_gateway_plugin.so (a CMake MODULE). Per CLAUDE.md, thread_local
storage in MODULE-target code uses initial-exec TLS (TPOFF32) which is
incompatible with shared objects and can crash at load time on
toolchains that do not patch it. Replace with a mutex-guarded static
std::mt19937 - UUID generation is one-per-goal so contention is
irrelevant.
Ros2FaultServiceTransport::list_rosbags - parallel-array UB:
Indexed file_paths[i], formats[i], durations_sec[i], sizes_bytes[i] by
fault_codes.size() without validating that all five vectors have the
same length. A server bug or schema drift on the remote ListRosbags
service would have produced a UB read past end-of-vector. Compute the
shortest length, surface the mismatch as a FaultResult error, and only
index the validated range.
TriggerTopicSubscriber::subscribe - silent failure on rclcpp throw:
Underlying rclcpp subscription failures were logged but the function
returned void. Ros2TopicSubscriptionTransport then bookkept a handle
for a dead subscription; TriggerManager added the trigger to its map;
the HTTP handler returned 201 for a trigger that never fires. Rethrow
from the subscriber, catch in the transport adapter to return nullptr,
and on nullptr in TriggerManager: roll back create() with a runtime
error, keep retry entries on the pending list (do not mark resolved),
re-queue restored persistent triggers for retry.
Ros2ParameterTransport::cache_default_values - empty per-parameter catch:
Per-parameter get_parameters({name}) exceptions were swallowed silently,
so reset_parameter later returned NO_DEFAULTS_CACHED with no diagnostic
trail. Log a throttled WARN with the parameter name, node name, and
exception text so the failure is debuggable from /rosout.
docs/design/index.rst - stale class layout:
Re-tag the six managers from gateway_ros2 to gateway_core to match the
post-refactor target placement. Drop the removed
ManifestDiscoveryStrategy class block and bullet, replace with
ManifestLayer. Replace JsonSerializer ownership arrows on
Operation/DataAccess Manager with Service/Action/TopicTransport
arrows. Replace the ConfigurationManager -> rclcpp::Node arrow with
ParameterTransport.
Sanitizer-tsan was intermittently failing on test_daisy_chain_aggregation because gateway_node-2 (peer_b) needed more than 15s to exit after SIGINT. The hang sat between TriggerTopicSubscriber::shutdown completion and the end of GatewayNode::~GatewayNode - i.e. somewhere in REST server stop or member-destruction order. ASan-only runs squeezed in under the 15s grace; TSan (heavier instrumentation) blew past it and the launch system escalated to SIGKILL with exit -9. Root cause: AggregationManager has no shutdown path. When SIGINT arrived mid-forward, peer_b had REST worker threads blocked inside PeerClient::forward_request's local httplib::Client::Get/Post call to peer_c. The local Client cannot be stopped externally, and the per-call read timeout (default 5000ms, ~25-50s effective under TSan) gates worker exit. cpp-httplib Server::stop unblocks the accept loop but cannot interrupt blocked client I/O on the worker side. Fix: track every active outbound httplib::Client per PeerClient and add shutdown() that calls stop() on each. PeerClient::ScopedClient RAII helper registers/unregisters with the active set and is used by forward_request, forward_and_get_json, and fetch_entities (replacing the previous local Client). AggregationManager::shutdown() forwards to every static + discovered peer. GatewayNode::~GatewayNode invokes aggregation_mgr_->shutdown() right after stopping mDNS and BEFORE stop_rest_server(), so REST workers mid-forward unwind on Error::Canceled instead of waiting out the full read timeout. Subsequent forwards after shutdown short-circuit with 503/error rather than dialling the peer.
The first cut of PeerClient::shutdown() also called client_->stop() on
the lazily-created health-check client to be thorough. That added a TSan
report ('unlock of an unlocked mutex (or by a wrong thread)') and a
SIGABRT on Rolling: the health-check client is exclusively used inside
check_health() which holds client_mutex_ around the entire Get() call;
calling stop() on it from the shutdown thread races with cpp-httplib's
own socket_mutex_ while check_health still owns the socket internals.
On glibc + libstdc++ on Noble the mutex misuse aborts the process
(test_beacon_param::test_exit_codes saw exit code -6); under TSan the
warning is emitted and the test fails with exit code 66.
The health-check client is short-lived (single GET, bounded read
timeout), so leaving it alone during shutdown only delays exit by at
most one health-check duration. The multi-second shutdown hang we set
out to fix came from in-flight forward_request / fetch_entities calls,
which use ScopedClient and ARE in the to_stop snapshot.
…wn interrupt After the previous round of shutdown work, test_leaf_collision_aggregation still escalated to SIGKILL under TSan: in-flight forward_request calls unblocked promptly, but the periodic check_health() call inside PeerClient was still blocked in client_->Get() against the unhealthy peer. That call holds client_mutex_ for the full read timeout (~5s nominal, ~25-50s under TSan), and the cpp-httplib Client cannot be interrupted from another thread unless someone calls Client::stop() on it. Register the lazily-created shared client_ with the active-client set on first creation. shutdown() then iterates the set as before and calls stop() on every registered client, which interrupts the in-flight health-check Get() the same way it interrupts forward_request and fetch_entities. The client lives for the rest of the PeerClient's lifetime so we never unregister it; declaration order guarantees active_clients_ is destroyed before client_ so no dangling pointer ever sits in the active set.
Calling httplib::Client::stop() from PeerClient::shutdown() to interrupt an in-flight Get/Post on a sibling thread is the upstream-documented way to cancel a blocking client I/O call. The cpp-httplib internal mutex sequencing in that path tickles TSan with an 'unlock of an unlocked mutex (or by a wrong thread)' on every aggregation shutdown, but the supported API contract is honoured and ASan / Rolling do not abort. TSan attributes the report to the caller (peer_client.cpp:780 in PeerClient::shutdown), so suppress by that symbol rather than by called_from_lib - the existing 'called_from_lib:libcpp-httplib.so.0.14' suppression covers race: reports but not mutex: reports against a stack whose top frame is our own translation unit.
Health-check Get() against an unresponsive peer blocks for the configured forward timeout (5s nominal). Under TSan that becomes ~25s effective and exceeds the launch_test 15s SIGINT-to-SIGKILL grace, producing exit -9. We cannot use httplib::Client::stop() to interrupt the call - cpp-httplib's stop() unlocks an internal mutex from a thread that did not lock it, and glibc + pthread debug (Ubuntu Noble) treats that as a fatal SIGABRT. Instead, hardcode the health-check timeouts at 1s (or the configured forward timeout, whichever is smaller). Health checks are ping-shaped by design; an unresponsive peer is 'unhealthy' whether we wait 1s or 5s. Capping the health timeout bounds shutdown delay to ~5s under TSan, well inside the grace window, without affecting forward semantics (forward_request and fetch_entities still use the full configured timeout via ScopedClient, which IS interruptible through the active-client registry).
Hardens the manager/transport split shipped earlier on this branch by closing correctness, observability, and lifecycle gaps surfaced during internal validation: - TriggerManager::create() now returns TriggerError::SubscribeFailed via tl::make_unexpected instead of throwing std::runtime_error out of its tl::expected-returning signature; the HTTP handler maps it to 503 through an exhaustive switch with a defensive-500 sentinel for future enum values. Rollback also clears the dispatch index and (for persistent triggers) the SQLite record. - LogManager regains observability via an injected log_sink callback; gateway_node forwards to RCLCPP_WARN/ERROR so plugin-provider exceptions surface on /rosout again instead of std::cerr. - Ros2LogSource holds its mutex through callback dispatch so a racing stop() cannot null the callback while a copy is mid-dispatch - honouring the documented post-stop contract. - core/discovery/layers/runtime_layer.hpp forward-declares Ros2RuntimeIntrospection so the header no longer transitively pulls rclcpp; test_gateway_core_smoke now includes it as a link-time guard against future transitive leaks the grep-based purity check cannot see. - In-tree includes migrate to the canonical core/managers/... paths; the six legacy shim headers emit #pragma message deprecation notes for out-of-tree consumers. - Gateway-node constructor declares parameter-service tuning parameters alongside the other declare_parameter calls, leaving the transport-wiring block to two make_shared lines. Tests + docs: - New routing tests pin two behaviours: subscribe-failure on TriggerManager::create returns the typed error variant, and persistent-trigger restore queues the entry onto unresolved_data_triggers_ when the transport rejects the re-subscription. - test_graph_event_discovery's latency assertion drops to a 2 s graph-event-specific bound so backstop-driven regressions cannot slip through the previous 30 s trivially-true assertion. - New test_service_action_resolver_contract pins the abstract interface contract; the binary links against gateway_core alone (no rclcpp). - New test_ros2_log_source.NoCallbackFiresAfterStopReturns regression test for the LogSource contract. - TSan suppressions narrow: drop mutex:PeerClient::shutdown so future genuine races attributed to our own symbols are no longer silenced; the cpp-httplib stop() pattern stays covered by called_from_lib. - docs/config/server.rst and docs/troubleshooting.rst track the new 30000 ms refresh_interval_ms default and its graph-event-driven semantics; design/index.rst gains a mock-transport code example; TopicTransport documents the deliberate ros2_medkit_serialization coupling on get_type_introspection(); LogSource documents why it is not named LogTransport.
0c6049f to
4aa1826
Compare
…ction timeouts Three related contract fixes in the ROS 2 transport adapters: - Ros2LogSource::start() now clears the shutdown latch left by a prior stop(). Without this, the subscription lambda short-circuited forever after a stop()/start() cycle, breaking the documented idempotent start/stop contract on LogSource. New regression test covers the restart path alongside the existing post-stop guarantee test. - Ros2ServiceTransport::call() bounds wait_for_service() by the caller-provided timeout instead of a hardcoded 5 s, so a missing service fails within the caller's budget (the async call below continues to bound itself via future.wait_for()). - Ros2ActionTransport::send_goal() does the same for the action-server availability wait.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 117 out of 117 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (2)
src/ros2_medkit_gateway/include/ros2_medkit_gateway/trigger_topic_subscriber.hpp:21
- This shim header prints a
#pragma messageon include. Because this is a public header, that will surface in downstream build output for every TU and may become unmanageable. Suggest removing the pragma and relying on deprecation via docs /[[deprecated]]attributes (or a macro that can be enabled explicitly).
src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp:21 - This shim uses
#pragma messageto announce deprecation, which will emit output for every compile that includes it. That can be problematic for downstream users still on the old include path. Consider replacing it with quieter deprecation signaling (docs /[[deprecated]]/ opt-in macro) and keep only the forwarding#include.
#pragma once
#pragma message("deprecated: include ros2_medkit_gateway/core/managers/configuration_manager.hpp instead")
// Backwards-compatibility shim - header relocated to core/managers/.
#include "ros2_medkit_gateway/core/managers/configuration_manager.hpp"
| #pragma once | ||
|
|
||
| #include <memory> | ||
| #include <mutex> | ||
| #include <nlohmann/json.hpp> | ||
| #include <rclcpp/rclcpp.hpp> | ||
| #include <shared_mutex> | ||
| #include <string> | ||
| #include <unordered_map> | ||
| #include <vector> | ||
| #pragma message("deprecated: include ros2_medkit_gateway/core/managers/data_access_manager.hpp instead") | ||
|
|
||
| #include "ros2_medkit_gateway/core/data/data_types.hpp" | ||
| #include "ros2_medkit_gateway/core/type_introspection.hpp" | ||
| #include "ros2_medkit_serialization/json_serializer.hpp" | ||
|
|
||
| namespace ros2_medkit_gateway { | ||
|
|
||
| using json = nlohmann::json; | ||
|
|
||
| class TopicDataProvider; // forward decl, see data/topic_data_provider.hpp | ||
|
|
||
| class DataAccessManager { | ||
| public: | ||
| explicit DataAccessManager(rclcpp::Node * node); | ||
|
|
||
| /** | ||
| * @brief Publish data to a specific topic | ||
| * @param topic_path Full topic path (e.g., /chassis/brakes/command) | ||
| * @param msg_type ROS 2 message type (e.g., std_msgs/msg/Float32) | ||
| * @param data JSON data to publish | ||
| * @param timeout_sec Timeout for the publish operation | ||
| * @return JSON with publish status | ||
| */ | ||
| json publish_to_topic(const std::string & topic_path, const std::string & msg_type, const json & data, | ||
| double timeout_sec = 5.0); | ||
|
|
||
| /** | ||
| * @brief Get topic sample with fallback to metadata on timeout | ||
| * | ||
| * If the topic is publishing, returns actual data with type info. | ||
| * If the topic times out, returns metadata (type, schema, pub/sub counts) instead of error. | ||
| * | ||
| * @param topic_name Full topic path (e.g., "/powertrain/engine/temperature") | ||
| * @param timeout_sec Timeout for data retrieval. Use -1.0 to use the topic_sample_timeout_sec parameter (default) | ||
| * @return JSON object with one of two structures: | ||
| * - status="data": {topic, timestamp, data, status, type, type_info, publisher_count, subscriber_count} | ||
| * - status="metadata_only": {topic, timestamp, status, type, type_info, publisher_count, subscriber_count} | ||
| * @throws TopicNotAvailableException if topic doesn't exist or metadata cannot be retrieved | ||
| */ | ||
| json get_topic_sample_with_fallback(const std::string & topic_name, double timeout_sec = -1.0); | ||
|
|
||
| /** | ||
| * @brief Get the type introspection instance | ||
| */ | ||
| TypeIntrospection * get_type_introspection() const { | ||
| return type_introspection_.get(); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Attach a TopicDataProvider for sampling. | ||
| * | ||
| * The provider owns the pool-backed subscription path (issue #375 race fix). | ||
| * Non-owning pointer; caller retains ownership. Safe to call once at wiring | ||
| * time; no concurrent access to the setter. | ||
| */ | ||
| void set_topic_data_provider(TopicDataProvider * provider) { | ||
| topic_data_provider_ = provider; | ||
| } | ||
|
|
||
| TopicDataProvider * get_topic_data_provider() const { | ||
| return topic_data_provider_; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Get single topic sample using native rclcpp APIs | ||
| * | ||
| * Fast path for single topic sampling with publisher count check. | ||
| * | ||
| * @param topic_name Full topic path | ||
| * @param timeout_sec Timeout for sampling (only used if topic has publishers) | ||
| * @return JSON with topic data or metadata | ||
| */ | ||
| json get_topic_sample_native(const std::string & topic_name, double timeout_sec = 1.0); | ||
|
|
||
| /** | ||
| * @brief Get the configured topic sample timeout | ||
| * @return Timeout in seconds for topic sampling | ||
| */ | ||
| double get_topic_sample_timeout() const { | ||
| return topic_sample_timeout_sec_; | ||
| } | ||
|
|
||
| private: | ||
| /** | ||
| * @brief Convert TopicSampleResult to JSON with type info enrichment | ||
| */ | ||
| json sample_result_to_json(const TopicSampleResult & sample); | ||
|
|
||
| /** | ||
| * @brief Get or create a cached GenericPublisher for a topic | ||
| * @param topic_path Full topic path | ||
| * @param msg_type ROS 2 message type | ||
| * @return Shared pointer to GenericPublisher | ||
| */ | ||
| rclcpp::GenericPublisher::SharedPtr get_or_create_publisher(const std::string & topic_path, | ||
| const std::string & msg_type); | ||
|
|
||
| rclcpp::Node * node_; | ||
|
|
||
| /// JSON serializer for native message serialization | ||
| std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer_; | ||
|
|
||
| /// Cached publishers (topic+type -> publisher) | ||
| std::unordered_map<std::string, rclcpp::GenericPublisher::SharedPtr> publishers_; | ||
|
|
||
| /// Mutex for thread-safe publisher cache access | ||
| mutable std::shared_mutex publishers_mutex_; | ||
|
|
||
| std::unique_ptr<TypeIntrospection> type_introspection_; | ||
| TopicDataProvider * topic_data_provider_{nullptr}; ///< Non-owning; set at wiring time. | ||
| double topic_sample_timeout_sec_; | ||
|
|
||
| /** | ||
| * @brief Get default timeout for topic sampling (from parameter) | ||
| */ | ||
| double get_default_topic_timeout() const { | ||
| return topic_sample_timeout_sec_; | ||
| } | ||
| }; | ||
|
|
||
| } // namespace ros2_medkit_gateway | ||
| // Backwards-compatibility shim - header relocated to core/managers/. | ||
| #include "ros2_medkit_gateway/core/managers/data_access_manager.hpp" |
| #pragma once | ||
|
|
||
| #include <memory> | ||
| #include <mutex> | ||
| #include <nlohmann/json.hpp> | ||
| #include <rclcpp/rclcpp.hpp> | ||
| #include <string> | ||
| #include <vector> | ||
| #pragma message("deprecated: include ros2_medkit_gateway/core/managers/fault_manager.hpp instead") | ||
|
|
||
| #include "ros2_medkit_msgs/msg/environment_data.hpp" | ||
| #include "ros2_medkit_msgs/msg/fault.hpp" | ||
| #include "ros2_medkit_msgs/srv/clear_fault.hpp" | ||
| #include "ros2_medkit_msgs/srv/get_fault.hpp" | ||
| #include "ros2_medkit_msgs/srv/get_rosbag.hpp" | ||
| #include "ros2_medkit_msgs/srv/get_snapshots.hpp" | ||
| #include "ros2_medkit_msgs/srv/list_faults.hpp" | ||
| #include "ros2_medkit_msgs/srv/list_rosbags.hpp" | ||
| #include "ros2_medkit_msgs/srv/report_fault.hpp" | ||
|
|
||
| namespace ros2_medkit_gateway { | ||
|
|
||
| using json = nlohmann::json; | ||
|
|
||
| /// Result of a fault operation | ||
| struct FaultResult { | ||
| bool success; | ||
| json data; | ||
| std::string error_message; | ||
| }; | ||
|
|
||
| /// Result of get_fault operation with full message types | ||
| struct FaultWithEnvResult { | ||
| bool success; | ||
| std::string error_message; | ||
| ros2_medkit_msgs::msg::Fault fault; | ||
| ros2_medkit_msgs::msg::EnvironmentData environment_data; | ||
| }; | ||
|
|
||
| /// Manager for fault management operations | ||
| /// Provides interface to the ros2_medkit_fault_manager services | ||
| class FaultManager { | ||
| public: | ||
| explicit FaultManager(rclcpp::Node * node); | ||
|
|
||
| /// Report a fault from a component | ||
| /// @param fault_code Unique fault identifier | ||
| /// @param severity Fault severity (0=INFO, 1=WARN, 2=ERROR, 3=CRITICAL) | ||
| /// @param description Human-readable description | ||
| /// @param source_id Component identifier (namespace path) | ||
| /// @return FaultResult with success status | ||
| FaultResult report_fault(const std::string & fault_code, uint8_t severity, const std::string & description, | ||
| const std::string & source_id); | ||
|
|
||
| /// Get all faults, optionally filtered by component | ||
| /// @param source_id Optional component identifier to filter by (empty = all) | ||
| /// @param include_prefailed Include PREFAILED status faults (debounce not yet confirmed) | ||
| /// @param include_confirmed Include CONFIRMED status faults | ||
| /// @param include_cleared Include CLEARED status faults | ||
| /// @param include_healed Include HEALED and PREPASSED status faults | ||
| /// @param include_muted Include muted faults (correlation symptoms) in response | ||
| /// @param include_clusters Include cluster info in response | ||
| /// @return FaultResult with array of faults (and optionally muted_faults and clusters) | ||
| FaultResult list_faults(const std::string & source_id = "", bool include_prefailed = true, | ||
| bool include_confirmed = true, bool include_cleared = false, bool include_healed = false, | ||
| bool include_muted = false, bool include_clusters = false); | ||
|
|
||
| /// Get a specific fault by code with environment data | ||
| /// @param fault_code Fault identifier | ||
| /// @param source_id Optional component identifier to verify fault belongs to component | ||
| /// @return FaultWithEnvResult with fault and environment_data, or error if not found | ||
| FaultWithEnvResult get_fault_with_env(const std::string & fault_code, const std::string & source_id = ""); | ||
|
|
||
| /// Get a specific fault by code (JSON result - legacy) | ||
| /// @param fault_code Fault identifier | ||
| /// @param source_id Optional component identifier to verify fault belongs to component | ||
| /// @return FaultResult with fault data or error if not found | ||
| /// @note Thread-safe: delegates to get_fault_with_env() which acquires get_mutex_. | ||
| /// Do NOT call this method while holding get_mutex_. | ||
| FaultResult get_fault(const std::string & fault_code, const std::string & source_id = ""); | ||
|
|
||
| /// Clear a fault | ||
| /// @param fault_code Fault identifier to clear | ||
| /// @return FaultResult with success status | ||
| FaultResult clear_fault(const std::string & fault_code); | ||
|
|
||
| /// Get snapshots for a fault | ||
| /// @param fault_code Fault identifier | ||
| /// @param topic Optional topic filter (empty = all topics) | ||
| /// @return FaultResult with snapshot data (JSON in data field) | ||
| FaultResult get_snapshots(const std::string & fault_code, const std::string & topic = ""); | ||
|
|
||
| /// Get rosbag file info for a fault | ||
| /// @param fault_code Fault identifier | ||
| /// @return FaultResult with rosbag file path and metadata | ||
| FaultResult get_rosbag(const std::string & fault_code); | ||
|
|
||
| /// Get all rosbag files for an entity (batch operation) | ||
| /// @param entity_fqn Entity fully qualified name for prefix matching | ||
| /// @return FaultResult with arrays of rosbag metadata | ||
| FaultResult list_rosbags(const std::string & entity_fqn); | ||
|
|
||
| /// Check if fault manager service is available | ||
| /// @return true if services are available | ||
| bool is_available() const; | ||
|
|
||
| /// Convert Fault message to JSON (static utility for reuse by SSE handler) | ||
| static json fault_to_json(const ros2_medkit_msgs::msg::Fault & fault); | ||
|
|
||
| private: | ||
| /// Wait for services to become available | ||
| bool wait_for_services(std::chrono::duration<double> timeout); | ||
|
|
||
| rclcpp::Node * node_; | ||
|
|
||
| /// Service clients | ||
| rclcpp::Client<ros2_medkit_msgs::srv::ReportFault>::SharedPtr report_fault_client_; | ||
| rclcpp::Client<ros2_medkit_msgs::srv::GetFault>::SharedPtr get_fault_client_; | ||
| rclcpp::Client<ros2_medkit_msgs::srv::ListFaults>::SharedPtr list_faults_client_; | ||
| rclcpp::Client<ros2_medkit_msgs::srv::ClearFault>::SharedPtr clear_fault_client_; | ||
| rclcpp::Client<ros2_medkit_msgs::srv::GetSnapshots>::SharedPtr get_snapshots_client_; | ||
| rclcpp::Client<ros2_medkit_msgs::srv::GetRosbag>::SharedPtr get_rosbag_client_; | ||
| rclcpp::Client<ros2_medkit_msgs::srv::ListRosbags>::SharedPtr list_rosbags_client_; | ||
|
|
||
| /// Service timeout | ||
| double service_timeout_sec_{5.0}; | ||
| std::string fault_manager_base_path_{"/fault_manager"}; | ||
|
|
||
| /// Per-client mutexes for thread-safe service calls. | ||
| /// Split by service client so that read operations (list, get) are not blocked | ||
| /// by slow write operations (report_fault with snapshot capture). | ||
| mutable std::mutex report_mutex_; | ||
| mutable std::mutex list_mutex_; | ||
| mutable std::mutex get_mutex_; | ||
| mutable std::mutex clear_mutex_; | ||
| mutable std::mutex snapshots_mutex_; | ||
| mutable std::mutex rosbag_mutex_; | ||
| mutable std::mutex list_rosbags_mutex_; | ||
| }; | ||
|
|
||
| } // namespace ros2_medkit_gateway | ||
| // Backwards-compatibility shim - header relocated to core/managers/. | ||
| #include "ros2_medkit_gateway/core/managers/fault_manager.hpp" |
| } | ||
| if (sample.status == "data" && !sample.data.is_null()) { | ||
| out["data"] = sample.data; | ||
| out["timestamp"] = | ||
| std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()) | ||
| .count(); | ||
| } |
| const auto timeout_secs = | ||
| std::chrono::milliseconds{static_cast<std::int64_t>(std::max(timeout.count(), 0.0) * 1000.0)}; | ||
| auto future_status = future_and_id.wait_for(timeout_secs); | ||
|
|
||
| if (future_status != std::future_status::ready) { | ||
| client->remove_pending_request(future_and_id.request_id); | ||
| ros2_medkit_serialization::destroy_ros_message(&ros_request); | ||
| result.success = false; | ||
| result.error_message = | ||
| "Service call timed out (" + std::to_string(static_cast<int>(timeout.count())) + "s): " + service_path; | ||
| return result; |
| #pragma once | ||
|
|
||
| #include <action_msgs/msg/goal_status_array.hpp> | ||
| #include <array> | ||
| #include <chrono> | ||
| #include <map> | ||
| #include <memory> | ||
| #include <mutex> | ||
| #include <nlohmann/json.hpp> | ||
| #include <optional> | ||
| #include <random> | ||
| #include <rclcpp/rclcpp.hpp> | ||
| #include <shared_mutex> | ||
| #include <string> | ||
| #include <vector> | ||
| #pragma message("deprecated: include ros2_medkit_gateway/core/managers/operation_manager.hpp instead") | ||
|
|
||
| #include "ros2_medkit_gateway/compat/generic_client_compat.hpp" | ||
| #include "ros2_medkit_gateway/core/discovery/models/common.hpp" | ||
| #include "ros2_medkit_gateway/discovery/discovery_manager.hpp" | ||
| #include "ros2_medkit_serialization/json_serializer.hpp" | ||
| #include "ros2_medkit_serialization/service_action_types.hpp" | ||
|
|
||
| namespace ros2_medkit_gateway { | ||
|
|
||
| class ResourceChangeNotifier; | ||
|
|
||
| using json = nlohmann::json; | ||
|
|
||
| /// Result of a synchronous service call | ||
| struct ServiceCallResult { | ||
| bool success; | ||
| json response; | ||
| std::string error_message; | ||
| }; | ||
|
|
||
| /// Action goal status (matches ROS2 action_msgs/msg/GoalStatus) | ||
| enum class ActionGoalStatus : int8_t { | ||
| UNKNOWN = 0, | ||
| ACCEPTED = 1, | ||
| EXECUTING = 2, | ||
| CANCELING = 3, | ||
| SUCCEEDED = 4, | ||
| CANCELED = 5, | ||
| ABORTED = 6 | ||
| }; | ||
|
|
||
| /// Convert status enum to string | ||
| std::string action_status_to_string(ActionGoalStatus status); | ||
|
|
||
| /// Result of sending an action goal | ||
| struct ActionSendGoalResult { | ||
| bool success; | ||
| std::string goal_id; // UUID hex string | ||
| bool goal_accepted; | ||
| std::string error_message; | ||
| }; | ||
|
|
||
| /// Result of canceling an action goal | ||
| struct ActionCancelResult { | ||
| bool success; | ||
| int8_t return_code; // 0=accepted, 1=rejected, 2=unknown_id, 3=terminated | ||
| std::string error_message; | ||
| }; | ||
|
|
||
| /// Result of getting action result | ||
| struct ActionGetResultResult { | ||
| bool success; | ||
| ActionGoalStatus status; | ||
| json result; | ||
| std::string error_message; | ||
| }; | ||
|
|
||
| /// Tracked action goal info (stored locally) | ||
| struct ActionGoalInfo { | ||
| std::string goal_id; | ||
| std::string action_path; // e.g., /powertrain/engine/long_calibration | ||
| std::string action_type; // e.g., example_interfaces/action/Fibonacci | ||
| std::string entity_id; // SOVD entity ID (e.g., engine) for trigger notifications | ||
| ActionGoalStatus status; | ||
| json last_feedback; | ||
| std::chrono::system_clock::time_point created_at; | ||
| std::chrono::system_clock::time_point last_update; | ||
| }; | ||
|
|
||
| /// Manager for ROS2 operations (services and actions) | ||
| /// Handles service calls synchronously and action calls asynchronously | ||
| class OperationManager { | ||
| public: | ||
| explicit OperationManager(rclcpp::Node * node, DiscoveryManager * discovery_manager); | ||
|
|
||
| ~OperationManager(); | ||
|
|
||
| OperationManager(const OperationManager &) = delete; | ||
| OperationManager & operator=(const OperationManager &) = delete; | ||
| OperationManager(OperationManager &&) = delete; | ||
| OperationManager & operator=(OperationManager &&) = delete; | ||
|
|
||
| /// Explicitly release subscriptions, clients, and tracked goals. | ||
| /// Call while executor is still running to allow safe callback cleanup. | ||
| /// Called automatically by destructor, but GatewayNode calls it earlier | ||
| /// during its shutdown sequence. | ||
| void shutdown(); | ||
|
|
||
| /// Set optional notifier for broadcasting operation status changes to trigger subsystem. | ||
| void set_notifier(ResourceChangeNotifier * notifier); | ||
|
|
||
| /// Call a ROS2 service synchronously | ||
| /// @param service_path Full service path (e.g., "/powertrain/engine/calibrate") | ||
| /// @param service_type Service type (e.g., "std_srvs/srv/Trigger") | ||
| /// @param request JSON request body | ||
| /// @return ServiceCallResult with response or error | ||
| ServiceCallResult call_service(const std::string & service_path, const std::string & service_type, | ||
| const json & request); | ||
|
|
||
| /// Find and call a service by component and operation name | ||
| /// Uses discovery cache to resolve service path and type if not provided | ||
| /// @param component_ns Component namespace (e.g., "/powertrain/engine") | ||
| /// @param operation_name Operation name (e.g., "calibrate") | ||
| /// @param service_type Optional service type override | ||
| /// @param request JSON request body | ||
| /// @return ServiceCallResult with response or error | ||
| ServiceCallResult call_component_service(const std::string & component_ns, const std::string & operation_name, | ||
| const std::optional<std::string> & service_type, const json & request); | ||
|
|
||
| /// Validate message type format (package/srv/Type or package/action/Type) | ||
| static bool is_valid_message_type(const std::string & type); | ||
|
|
||
| /// Validate UUID hex string format (32 hex characters) | ||
| static bool is_valid_uuid_hex(const std::string & uuid_hex); | ||
|
|
||
| /// Check if type is a service type (contains /srv/) | ||
| static bool is_service_type(const std::string & type); | ||
|
|
||
| /// Check if type is an action type (contains /action/) | ||
| static bool is_action_type(const std::string & type); | ||
|
|
||
| // ==================== ACTION OPERATIONS ==================== | ||
|
|
||
| /// Send a goal to an action server using native rclcpp_action internal services | ||
| /// @param action_path Full action path (e.g., "/powertrain/engine/long_calibration") | ||
| /// @param action_type Action type (e.g., "example_interfaces/action/Fibonacci") | ||
| /// @param goal JSON goal data | ||
| /// @param entity_id SOVD entity ID for trigger notifications (e.g., "engine") | ||
| /// @return ActionSendGoalResult with goal_id or error | ||
| ActionSendGoalResult send_action_goal(const std::string & action_path, const std::string & action_type, | ||
| const json & goal, const std::string & entity_id = ""); | ||
|
|
||
| /// Send a goal to an action using component namespace and operation name | ||
| /// Uses discovery cache to resolve action path and type if not provided | ||
| /// @param component_ns Component namespace (e.g., "/powertrain/engine") | ||
| /// @param operation_name Operation name (e.g., "long_calibration") | ||
| /// @param action_type Optional action type override | ||
| /// @param goal JSON goal data | ||
| /// @param entity_id SOVD entity ID for trigger notifications (e.g., "engine") | ||
| /// @return ActionSendGoalResult with goal_id or error | ||
| ActionSendGoalResult send_component_action_goal(const std::string & component_ns, const std::string & operation_name, | ||
| const std::optional<std::string> & action_type, const json & goal, | ||
| const std::string & entity_id = ""); | ||
|
|
||
| /// Cancel a running action goal using ros2 action cancel | ||
| /// @param action_path Full action path | ||
| /// @param goal_id Goal UUID hex string | ||
| /// @return ActionCancelResult | ||
| ActionCancelResult cancel_action_goal(const std::string & action_path, const std::string & goal_id); | ||
|
|
||
| /// Get the result of a completed action | ||
| /// This is a blocking call that waits for the action to complete | ||
| /// @param action_path Full action path | ||
| /// @param action_type Action type | ||
| /// @param goal_id Goal UUID hex string | ||
| /// @return ActionGetResultResult with result or error | ||
| ActionGetResultResult get_action_result(const std::string & action_path, const std::string & action_type, | ||
| const std::string & goal_id); | ||
|
|
||
| /// Get tracked goal info by goal_id | ||
| /// @param goal_id Goal UUID hex string | ||
| /// @return Optional ActionGoalInfo if found | ||
| std::optional<ActionGoalInfo> get_tracked_goal(const std::string & goal_id) const; | ||
|
|
||
| /// List all tracked goals | ||
| /// @return Vector of all tracked goals | ||
| std::vector<ActionGoalInfo> list_tracked_goals() const; | ||
|
|
||
| /// Get all goals for a specific action path | ||
| /// @param action_path Full action path (e.g., "/powertrain/engine/long_calibration") | ||
| /// @return Vector of goals for that action, sorted by last_update (newest first) | ||
| std::vector<ActionGoalInfo> get_goals_for_action(const std::string & action_path) const; | ||
|
|
||
| /// Get the most recent goal for a specific action path | ||
| /// @param action_path Full action path | ||
| /// @return Optional ActionGoalInfo if any goal exists for this action | ||
| std::optional<ActionGoalInfo> get_latest_goal_for_action(const std::string & action_path) const; | ||
|
|
||
| /// Update goal status in tracking | ||
| /// @param goal_id Goal UUID hex string | ||
| /// @param status New status | ||
| void update_goal_status(const std::string & goal_id, ActionGoalStatus status); | ||
|
|
||
| /// Update goal feedback in tracking | ||
| /// @param goal_id Goal UUID hex string | ||
| /// @param feedback New feedback JSON | ||
| void update_goal_feedback(const std::string & goal_id, const json & feedback); | ||
|
|
||
| /// Remove completed goals older than specified duration | ||
| /// @param max_age Maximum age of completed goals to keep | ||
| void cleanup_old_goals(std::chrono::seconds max_age = std::chrono::seconds(300)); | ||
|
|
||
| /// Subscribe to action status topic for real-time updates | ||
| /// Called automatically when a goal is sent | ||
| /// @param action_path Full action path (e.g., "/powertrain/engine/long_calibration") | ||
| void subscribe_to_action_status(const std::string & action_path); | ||
|
|
||
| /// Unsubscribe from action status topic | ||
| /// Called when no more active goals exist for this action | ||
| /// @param action_path Full action path | ||
| void unsubscribe_from_action_status(const std::string & action_path); | ||
|
|
||
| private: | ||
| /// Set of clients for an action (internal services) | ||
| struct ActionClientSet { | ||
| compat::GenericServiceClient::SharedPtr send_goal_client; | ||
| compat::GenericServiceClient::SharedPtr get_result_client; | ||
| compat::GenericServiceClient::SharedPtr cancel_goal_client; | ||
| std::string action_type; // Store type for later use | ||
| }; | ||
|
|
||
| /// Convert UUID hex string to JSON array of byte values | ||
| json uuid_hex_to_json_array(const std::string & uuid_hex); | ||
|
|
||
| /// Generate a random UUID | ||
| std::array<uint8_t, 16> generate_uuid(); | ||
|
|
||
| /// Convert UUID bytes to JSON array | ||
| json uuid_bytes_to_json_array(const std::array<uint8_t, 16> & uuid); | ||
|
|
||
| /// Track a new goal | ||
| void track_goal(const std::string & goal_id, const std::string & action_path, const std::string & action_type, | ||
| const std::string & entity_id); | ||
|
|
||
| /// Get or create a cached GenericServiceClient for a service | ||
| compat::GenericServiceClient::SharedPtr get_or_create_service_client(const std::string & service_path, | ||
| const std::string & service_type); | ||
|
|
||
| /// Get or create cached action clients for an action | ||
| ActionClientSet & get_or_create_action_clients(const std::string & action_path, const std::string & action_type); | ||
|
|
||
| /// Make cache key from service path and type | ||
| static std::string make_client_key(const std::string & service_path, const std::string & service_type); | ||
|
|
||
| rclcpp::Node * node_; | ||
| DiscoveryManager * discovery_manager_; | ||
| ResourceChangeNotifier * notifier_ = nullptr; | ||
|
|
||
| /// Random number generator for UUID generation | ||
| std::mt19937 rng_; | ||
| std::mutex rng_mutex_; | ||
|
|
||
| /// Native JSON serializer for service calls | ||
| std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer_; | ||
|
|
||
| /// Cache for GenericServiceClient instances (key = "service_path|service_type") | ||
| mutable std::shared_mutex clients_mutex_; | ||
| std::map<std::string, compat::GenericServiceClient::SharedPtr> generic_clients_; | ||
|
|
||
| /// Cache for action client sets (key = action_path) | ||
| std::map<std::string, ActionClientSet> action_clients_; | ||
|
|
||
| /// Timeout for service calls in seconds (configurable via service_call_timeout_sec param) | ||
| int service_call_timeout_sec_; | ||
|
|
||
| /// Map of goal_id -> ActionGoalInfo for tracking active goals | ||
| mutable std::mutex goals_mutex_; | ||
| std::map<std::string, ActionGoalInfo> tracked_goals_; | ||
|
|
||
| /// Callback for action status topic updates | ||
| void on_action_status(const std::string & action_path, const action_msgs::msg::GoalStatusArray::ConstSharedPtr & msg); | ||
|
|
||
| /// Convert goal UUID bytes to hex string | ||
| std::string uuid_bytes_to_hex(const std::array<uint8_t, 16> & uuid) const; | ||
|
|
||
| /// Map of action_path -> status subscription | ||
| mutable std::mutex subscriptions_mutex_; | ||
| std::map<std::string, rclcpp::Subscription<action_msgs::msg::GoalStatusArray>::SharedPtr> status_subscriptions_; | ||
| }; | ||
|
|
||
| } // namespace ros2_medkit_gateway | ||
| // Backwards-compatibility shim - header relocated to core/managers/. | ||
| #include "ros2_medkit_gateway/core/managers/operation_manager.hpp" |
| #pragma once | ||
|
|
||
| #include <atomic> | ||
| #include <deque> | ||
| #include <functional> | ||
| #include <mutex> | ||
| #include <nlohmann/json.hpp> | ||
| #include <optional> | ||
| #include <rcl_interfaces/msg/log.hpp> | ||
| #include <rclcpp/rclcpp.hpp> | ||
| #include <string> | ||
| #include <tl/expected.hpp> | ||
| #include <unordered_map> | ||
| #include <vector> | ||
| #pragma message("deprecated: include ros2_medkit_gateway/core/managers/log_manager.hpp instead") | ||
|
|
||
| #include "ros2_medkit_gateway/core/log_types.hpp" | ||
| #include "ros2_medkit_gateway/core/providers/log_provider.hpp" | ||
|
|
||
| namespace ros2_medkit_gateway { | ||
|
|
||
| using json = nlohmann::json; | ||
|
|
||
| class PluginManager; // forward declaration — full include in .cpp | ||
| class ResourceChangeNotifier; // forward declaration | ||
|
|
||
| /** | ||
| * @brief Manages /rosout log collection via the default ring-buffer backend. | ||
| * | ||
| * Subscribes to /rosout, normalizes logger names (strips leading '/'), | ||
| * and stores entries per node-name in fixed-size deque ring buffers. | ||
| * | ||
| * Plugin integration (two modes): | ||
| * - **Observer mode** (default): If a LogProvider plugin is registered, get_logs() | ||
| * and get_config() delegate to it. on_log_entry() is called on ALL LogProvider | ||
| * observers for every /rosout message. Observers returning true suppress ring-buffer | ||
| * storage. | ||
| * - **Full ingestion** (manages_ingestion() == true): The primary LogProvider owns | ||
| * the entire log pipeline. LogManager skips the /rosout subscription and ring buffer | ||
| * entirely. All queries and config operations delegate to the plugin. | ||
| * | ||
| * FQN normalization: | ||
| * - entity.fqn from the entity cache has a leading '/' (e.g. "/powertrain/engine/temp_sensor") | ||
| * - /rosout msg.name does NOT have a leading '/' (rcl_node_get_logger_name convention) | ||
| * - Callers pass raw FQNs from entity.fqn; LogManager strips leading '/' internally. | ||
| */ | ||
| class LogManager { | ||
| public: | ||
| /// Default maximum number of entries retained per node in the ring buffer | ||
| static constexpr size_t kDefaultBufferSize = 200; | ||
|
|
||
| /** | ||
| * @brief Construct LogManager | ||
| * | ||
| * If the primary LogProvider's manages_ingestion() returns true, the /rosout | ||
| * subscription and ring buffer are skipped entirely. Otherwise subscribes to | ||
| * /rosout as usual. | ||
| * | ||
| * @param node ROS 2 node (used for subscription and logging) | ||
| * @param plugin_mgr PluginManager for LogProvider lookup (may be nullptr) | ||
| * @param max_buffer_size Ring buffer size per node (override for unit testing) | ||
| */ | ||
| explicit LogManager(rclcpp::Node * node, PluginManager * plugin_mgr = nullptr, | ||
| size_t max_buffer_size = kDefaultBufferSize); | ||
|
|
||
| ~LogManager(); | ||
|
|
||
| LogManager(const LogManager &) = delete; | ||
| LogManager & operator=(const LogManager &) = delete; | ||
| LogManager(LogManager &&) = delete; | ||
| LogManager & operator=(LogManager &&) = delete; | ||
|
|
||
| /** | ||
| * @brief Query log entries for a set of node FQNs | ||
| * | ||
| * If a LogProvider plugin is registered, delegates to it. | ||
| * Otherwise uses the local ring buffer. | ||
| * | ||
| * @param node_fqns Node FQNs from entity.fqn (WITH leading '/' — normalized internally) | ||
| * @param prefix_match If true, match all buffered nodes whose name starts with the given prefix | ||
| * (used for Component queries). If false, exact match (App queries). | ||
| * @param min_severity Additional severity filter from query parameter. Empty = no override. | ||
| * @param context_filter Substring filter applied to log entry's name (logger name). Empty = no filter. | ||
| * @param entity_id Entity ID for config lookup. Empty = use defaults. | ||
| * @return JSON array of LogEntry objects sorted by id ascending, capped at entity config max_entries. | ||
| */ | ||
| tl::expected<json, std::string> get_logs(const std::vector<std::string> & node_fqns, bool prefix_match, | ||
| const std::string & min_severity, const std::string & context_filter, | ||
| const std::string & entity_id); | ||
|
|
||
| /// Get current log configuration for entity (returns defaults if unconfigured) | ||
| tl::expected<LogConfig, std::string> get_config(const std::string & entity_id) const; | ||
|
|
||
| /** | ||
| * @brief Update log configuration for an entity | ||
| * @return Empty string on success, error message on validation failure | ||
| */ | ||
| std::string update_config(const std::string & entity_id, const std::optional<std::string> & severity_filter, | ||
| const std::optional<size_t> & max_entries); | ||
|
|
||
| /** | ||
| * @brief Programmatically add a log entry (e.g. from trigger log_settings) | ||
| * | ||
| * Creates a LogEntry and pushes it to the internal ring buffer using the | ||
| * same path as on_rosout(). If a ResourceChangeNotifier is set, emits a | ||
| * "logs" CREATED notification so triggers can observe log changes. | ||
| * | ||
| * @param entity_id Entity to associate the log with (used as logger name) | ||
| * @param severity SOVD severity string (debug, info, warning, error, fatal) | ||
| * @param message Human-readable log message | ||
| * @param metadata Additional JSON metadata stored in the message (appended) | ||
| */ | ||
| void add_log_entry(const std::string & entity_id, const std::string & severity, const std::string & message, | ||
| const nlohmann::json & metadata); | ||
|
|
||
| /// Set the ResourceChangeNotifier for emitting log change events. | ||
| /// Called by GatewayNode after both LogManager and the notifier are available. | ||
| void set_notifier(ResourceChangeNotifier * notifier); | ||
|
|
||
| /// Callback that maps a ROS 2 node FQN to a manifest entity ID. | ||
| /// Returns empty string if the FQN cannot be resolved. | ||
| using NodeToEntityFn = std::function<std::string(const std::string &)>; | ||
|
|
||
| /// Set the node-to-entity resolver for trigger notifications. | ||
| /// When set, on_rosout() resolves node names to manifest entity IDs before | ||
| /// notifying the ResourceChangeNotifier. | ||
| void set_node_to_entity_resolver(NodeToEntityFn resolver); | ||
|
|
||
| // ---- Static utilities (no ROS 2 required — safe in unit tests) ---- | ||
|
|
||
| /// Convert ROS 2 uint8 log level -> SOVD severity string ("debug" for unknown levels) | ||
| static std::string level_to_severity(uint8_t level); | ||
|
|
||
| /// Convert SOVD severity string -> ROS 2 uint8 log level (0 for invalid/empty) | ||
| static uint8_t severity_to_level(const std::string & severity); | ||
|
|
||
| /// Check if a severity string is valid (one of: debug, info, warning, error, fatal) | ||
| static bool is_valid_severity(const std::string & severity); | ||
|
|
||
| /// Format a LogEntry as SOVD JSON (id, timestamp, severity, message, context) | ||
| static json entry_to_json(const LogEntry & entry); | ||
|
|
||
| /// Strip leading '/' from a node FQN for ring-buffer key normalization | ||
| static std::string normalize_fqn(const std::string & fqn); | ||
|
|
||
| // ---- Test injection (for unit tests — do not use in production code) ---- | ||
|
|
||
| /** | ||
| * @brief Inject a log entry directly into the ring buffer (bypasses /rosout subscription) | ||
| * | ||
| * Used by unit tests to populate the buffer without a live ROS 2 graph. | ||
| * In production the buffer is populated exclusively by the /rosout callback. | ||
| */ | ||
| void inject_entry_for_testing(LogEntry entry); | ||
|
|
||
| private: | ||
| void on_rosout(const rcl_interfaces::msg::Log::ConstSharedPtr & msg); | ||
|
|
||
| /// Get the effective LogProvider: plugin if registered, else nullptr (use local buffer) | ||
| LogProvider * effective_provider() const; | ||
|
|
||
| rclcpp::Node * node_; | ||
| PluginManager * plugin_mgr_; | ||
| ResourceChangeNotifier * notifier_ = nullptr; | ||
| /// Write-once: must be set before ROS executor starts spinning. | ||
| /// After that, only read from subscription callbacks (no mutex needed). | ||
| NodeToEntityFn node_to_entity_resolver_; | ||
| size_t max_buffer_size_; | ||
|
|
||
| rclcpp::Subscription<rcl_interfaces::msg::Log>::SharedPtr rosout_sub_; | ||
|
|
||
| // Ring buffers keyed by normalized node name (no leading '/') | ||
| // e.g. "powertrain/engine/temp_sensor" -> deque<LogEntry> | ||
| std::unordered_map<std::string, std::deque<LogEntry>> buffers_; | ||
| mutable std::mutex buffers_mutex_; | ||
|
|
||
| // Per-entity configuration keyed by entity_id | ||
| std::unordered_map<std::string, LogConfig> configs_; | ||
| mutable std::mutex configs_mutex_; | ||
|
|
||
| // Monotonically increasing entry ID (never resets; starts at 1) | ||
| std::atomic<int64_t> next_id_{1}; | ||
| }; | ||
|
|
||
| } // namespace ros2_medkit_gateway | ||
| // Backwards-compatibility shim - header relocated to core/managers/. | ||
| #include "ros2_medkit_gateway/core/managers/log_manager.hpp" |
Pull Request
Summary
This PR is the second pass after #391: each SOVD manager keeps its routing and
business logic, but the ROS 2 binding is extracted behind neutral Transport
adapters. Manager unit tests can now compose with mock transports and link only
against
gateway_core, which removesrclcppfrom the unit-test link line andeliminates the executor-lifecycle setup that was the root cause of intermittent
test failures (
process spawned but graph not yet seen,subscription destroyed mid-callback).The change also folds
RuntimeDiscoveryStrategyinto the existingIntrospectionProviderchain (built-in graph queries become just anotherprovider), relocates
TypeIntrospectionnext to the rest of therosidl_typesupportglue inros2_medkit_serialization, and switches theruntime-discovery refresh from cyclic polling to rclcpp graph-event driven, with
the timer kept only as a low-frequency safety backstop.
This PR stacks on #392 (
feat/medkit-core-scaffold). When #392 merges, GitHubauto-rebases this branch onto
main.Issue
Link the related issue (required):
Type
Testing
How was this tested / how should reviewers verify it?
The changes were exercised through the full local test suite. Reviewers can
reproduce by running, from the gateway worktree:
colcon build --symlink-install- whole workspace compiles cleanly(15 packages, no errors)
./scripts/test.sh unit- 2727 unit tests, 0 failures, 0 errors. Thenew mock-transport tests link only against
gateway_core+ GTest withno
ament_target_dependencies../scripts/test.sh lint --packages-select ros2_medkit_gateway- linterspass (clang-format, copyright, cmake-lint, flake8, pep257, xmllint)
./scripts/test.sh integ --packages-select ros2_medkit_integration_tests- 4501 tests, 0 failures, including the
test_graph_event_discoveryand
test_triggers_persistentfeaturescolcon build --packages-select ros2_medkit_gateway --cmake-args -DENABLE_CLANG_TIDY=ONthen
./scripts/test.sh tidy --packages-select ros2_medkit_gateway-clang-tidy clean on every changed file
colcon test --packages-select ros2_medkit_gateway --ctest-args -R "gateway_core_smoke"- the smoke test links only
gateway_core+ GTest and proves all eightprovider interfaces, seven Transport ports, and six manager class names
are reachable without rclcpp
To verify the discovery latency improvement, launch the gateway, then
ros2 runa demo node and observe/appslisting the new entity below 500 ms.Checklist
Commits
13 commits build up the main change in self-contained, reviewable steps. Each
step keeps the suite green and ends with the project compiling cleanly.
refactor(gateway): relocate operation result types into core/operationsrefactor(gateway): relocate parameter result types into core/configurationrefactor(gateway): relocate FaultResult into core/faultsfeat(gateway): introduce neutral Transport interfaces under core/transportsrefactor(gateway): route DataAccessManager through TopicTransport adapterrefactor(gateway): route OperationManager through Service/Action transportsrefactor(gateway): route ConfigurationManager through ParameterTransportrefactor(gateway): route FaultManager through FaultServiceTransportrefactor(gateway): route LogManager through LogSource adapterrefactor(gateway): route TriggerManager through TopicSubscriptionTransportrefactor(gateway): convert runtime discovery to IntrospectionProviderrefactor: relocate TypeIntrospection to ros2_medkit_serializationrefactor(gateway): drive discovery refresh from rclcpp graph eventsTwo functional fixes, one doc sweep, and a small clang-tidy / formatting
follow-up were folded into the branch during verification:
fix(gateway): only sweep orphan triggers on the backstop refresh- thegraph-event refresh path was running the orphan-trigger sweep during the
cold-start window, before DDS discovery had populated the entity cache.
Restored persistent triggers were then treated as orphans and removed from
the SQLite store. Sweep now runs only from the backstop tick, where the
cache is guaranteed to reflect a settled DDS view.
fix(opcua): bump test_opcua_plugin TIMEOUT to 240s- the suite legitimatelyneeds ~90 s wallclock for its 13 OPC UA connection-failure tests; the
default 60 s timeout truncated the run mid-suite under heavy parallel
colcon test load.
docs(gateway): refresh design index for managers-in-core layoutupdatesthe layered-library note, the TriggerTopicSubscriber class summary, and
the TriggerManager arrow on the class diagram so the design document
matches the new manager / transport split.
fix(gateway): resolve clang-tidy findings on touched filesand a smallfollow-up commit address every clang-tidy warning the incremental PR
job emits on the cpp files this branch modifies (named-parameter,
std::bind-> lambda, single-characterfind(), copiedshared_ptrparameters,vector::reservebeforeemplace_back,and one
operator+=rewrite infault_handlers).